package drds.data_propagate.parse;

import drds.data_propagate.binlog_event.BinLogEvent;
import drds.data_propagate.binlog_event.BinLogEventDecoder;
import drds.data_propagate.binlog_event.BinLogEventsContext;
import drds.data_propagate.binlog_event.Buffer;
import drds.data_propagate.binlog_event.event.FormatDescriptionEvent;
import drds.data_propagate.driver.Connector;
import drds.data_propagate.driver.QueryExecutor;
import drds.data_propagate.driver.UpdateExecutor;
import drds.data_propagate.driver.packets.GtidSet;
import drds.data_propagate.driver.packets.GtidSetImpl;
import drds.data_propagate.driver.packets.HeaderPacket;
import drds.data_propagate.driver.packets.client.command_packet.BinlogDumpGtidPacket;
import drds.data_propagate.driver.packets.client.command_packet.BinlogDumpPacket;
import drds.data_propagate.driver.packets.client.command_packet.RegisterSlavePacket;
import drds.data_propagate.driver.packets.client.command_packet.SemiAckPacket;
import drds.data_propagate.driver.packets.server.ErrorPacket;
import drds.data_propagate.driver.packets.server.ResultSetPacket;
import drds.data_propagate.driver.utils.PacketManager;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.multistage_coordinator.MultistageCoordinator;
import drds.data_propagate.parse.multistage_coordinator.MultistageCoordinatorImpl;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DumperImpl implements Dumper {

    private static final Logger logger = LoggerFactory.getLogger(DumperImpl.class);
    @Setter
    @Getter
    protected int connTimeout = 5 * 1000; // 5秒
    @Setter
    @Getter
    protected int soTimeout = 60 * 60 * 1000; // 1小时
    @Setter
    @Getter
    private Connector connector;
    @Setter
    @Getter
    private long slaveId;
    @Setter
    @Getter
    private Charset charset = Charset.forName("UTF-8");
    @Setter
    @Getter
    private BinlogFormat binlogFormat;
    @Setter
    @Getter
    private BinlogImage binlogImage;
    // table_meta_data releated
    @Setter
    @Getter
    private AuthenticationInfo authenticationInfo;
    @Setter
    @Getter
    private int binlogChecksum = BinLogEvent.binlog_checksum_alg_off;


    public DumperImpl() {
    }

    public DumperImpl(InetSocketAddress address, String username, String password) {
        authenticationInfo = new AuthenticationInfo();
        authenticationInfo.setInetSocketAddress(address);
        authenticationInfo.setUsername(username);
        authenticationInfo.setPassword(password);
        connector = new Connector(address, username, password);

    }

    public DumperImpl(InetSocketAddress address, String username, String password, byte charsetNumber,
                      String defaultSchema) {
        authenticationInfo = new AuthenticationInfo();
        authenticationInfo.setInetSocketAddress(address);
        authenticationInfo.setUsername(username);
        authenticationInfo.setPassword(password);
        authenticationInfo.setDefaultDatabaseName(defaultSchema);
        connector = new Connector(address, username, password, charsetNumber, defaultSchema);

    }

    public void connect() throws IOException {
        connector.connect();
    }

    public void reconnect() throws IOException {
        connector.reconnect();
    }

    public void disconnect() throws IOException {
        connector.disconnect();
    }

    public boolean isConnected() {
        return connector.isConnected();
    }

    public ResultSetPacket query(String queryString) throws IOException {
        QueryExecutor queryExecutor = new QueryExecutor(connector);
        return queryExecutor.query(queryString);
    }

    public List<ResultSetPacket> querys(String queryString) throws IOException {
        QueryExecutor queryExecutor = new QueryExecutor(connector);
        return queryExecutor.querys(queryString);
    }

    public void update(String queryString) throws IOException {
        UpdateExecutor updateExecutor = new UpdateExecutor(connector);
        updateExecutor.update(queryString);
    }

    /**
     * 加速主备切换时的查找速度，做一些特殊优化，比如只解析事务头或者尾
     */
    public void seek(String binlogFileName, Long binlogPosition, String gtid, Sink sink)
            throws IOException {
        updateSettings();
        queryBinlogCheckSum();
        sendBinlogDumpPacket(binlogFileName, binlogPosition);
        PacketReader packetReader = new PacketReader(connector.getReceiveBufferSize());
        packetReader.start(connector.getSocketChannel());
        BinLogEventDecoder binLogEventDecoder = new BinLogEventDecoder();
        binLogEventDecoder.handle(BinLogEvent.rotate_event);
        binLogEventDecoder.handle(BinLogEvent.format_description_event);
        binLogEventDecoder.handle(BinLogEvent.query_event);
        binLogEventDecoder.handle(BinLogEvent.xid_event);
        BinLogEventsContext binLogEventsContext = new BinLogEventsContext();
        // 若entry position存在gtid，则使用传入的gtid作为gtidSet
        // 拼接的标准,否则同时开启gtid和tsdb时，会导致丢失gtid
        // 而当源端数据库gtid 有purged时会有如下类似报错
        // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting
        // using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
        if (StringUtils.isNotEmpty(gtid)) {
            binLogEventDecoder.handle(BinLogEvent.gtid_log_event);
            binLogEventsContext.setGtidSet(GtidSetImpl.parse(gtid));
        }
        binLogEventsContext.setFormatDescriptionEvent(new FormatDescriptionEvent(4, binlogChecksum));
        while (packetReader.fetchNextBinlogEvent()) {

            BinLogEvent binLogEvent = binLogEventDecoder.decode(packetReader, binLogEventsContext);
            if (binLogEvent == null) {
                throw new ParseException("parse failed");
            }
            if (!sink.sink(binLogEvent)) {
                break;
            }
        }
    }

    public void dump(Sink sink, String binlogFileName, Long binlogPosition) throws IOException {
        updateSettings();
        queryBinlogCheckSum();
        sendRegisterSlavePacket();
        sendBinlogDumpPacket(binlogFileName, binlogPosition);
        PacketReader packetReader = new PacketReader(connector.getReceiveBufferSize());
        packetReader.start(connector.getSocketChannel());
        BinLogEventDecoder binLogEventDecoder = new BinLogEventDecoder(BinLogEvent.unknown_event, BinLogEvent.enum_end_event);
        BinLogEventsContext binLogEventsContext = new BinLogEventsContext();
        binLogEventsContext.setFormatDescriptionEvent(new FormatDescriptionEvent(4, binlogChecksum));
        while (packetReader.fetchNextBinlogEvent()) {

            BinLogEvent binLogEvent = binLogEventDecoder.decode(packetReader, binLogEventsContext);
            if (binLogEvent == null) {
                throw new ParseException("parse failed");
            }
            if (!sink.sink(binLogEvent)) {
                break;
            }
            if (binLogEvent.getSemiValue() == 1) {
                sendSemiAckPacket(binLogEventsContext.getBinlogEventPosition().getFileName(), binLogEventsContext.getBinlogEventPosition().getPosition());
            }
        }
    }

    @Override
    public void dump(Sink sink, GtidSet gtidSet) throws IOException {
        updateSettings();
        queryBinlogCheckSum();
        sendBinlogDumpGtidPacket(gtidSet);

        PacketReader packetReader = new PacketReader(connector.getReceiveBufferSize());
        try {
            packetReader.start(connector.getSocketChannel());
            BinLogEventDecoder binLogEventDecoder = new BinLogEventDecoder(BinLogEvent.unknown_event, BinLogEvent.enum_end_event);
            BinLogEventsContext binLogEventsContext = new BinLogEventsContext();
            binLogEventsContext.setFormatDescriptionEvent(new FormatDescriptionEvent(4, binlogChecksum));
            // fix bug: #890 将gtid传输至context中，供decode使用
            binLogEventsContext.setGtidSet(gtidSet);
            while (packetReader.fetchNextBinlogEvent()) {

                BinLogEvent binLogEvent = binLogEventDecoder.decode(packetReader, binLogEventsContext);
                if (binLogEvent == null) {
                    throw new ParseException("parse failed");
                }
                if (!sink.sink(binLogEvent)) {
                    break;
                }
            }
        } finally {
            packetReader.close();
        }
    }

    public void dump(Sink sink, long timestamp) throws IOException {
        throw new NullPointerException("Not implement yet");
    }

    @Override
    public void dump(MultistageCoordinator multistageCoordinator, String binlogFileName, Long binlogPosition)
            throws IOException {
        updateSettings();
        queryBinlogCheckSum();
        sendRegisterSlavePacket();
        sendBinlogDumpPacket(binlogFileName, binlogPosition);
        ((MultistageCoordinatorImpl) multistageCoordinator).setDumper(this);
        ((MultistageCoordinatorImpl) multistageCoordinator).setBinlogChecksum(binlogChecksum);
        PacketReader packetReader = new PacketReader(connector.getReceiveBufferSize());
        try {
            packetReader.start(connector.getSocketChannel());
            while (packetReader.fetchNextBinlogEvent()) {

                Buffer buffer = packetReader.duplicateFromEffectiveInitialIndexOffset();
                packetReader.moveEffectiveInitialIndexAndLimit(packetReader.limit());
                if (!multistageCoordinator.publish(buffer)) {
                    break;
                }
            }
        } finally {
            packetReader.close();
        }
    }

    @Override
    public void dump(MultistageCoordinator multistageCoordinator, long timestamp) throws IOException {
        throw new NullPointerException("Not implement yet");
    }

    @Override
    public void dump(MultistageCoordinator multistageCoordinator, GtidSet gtidSet) throws IOException {
        updateSettings();
        queryBinlogCheckSum();
        sendBinlogDumpGtidPacket(gtidSet);
        ((MultistageCoordinatorImpl) multistageCoordinator).setDumper(this);
        ((MultistageCoordinatorImpl) multistageCoordinator).setBinlogChecksum(binlogChecksum);
        PacketReader packetReader = new PacketReader(connector.getReceiveBufferSize());
        try {
            packetReader.start(connector.getSocketChannel());
            while (packetReader.fetchNextBinlogEvent()) {

                Buffer buffer = packetReader.duplicateFromEffectiveInitialIndexOffset();
                packetReader.moveEffectiveInitialIndexAndLimit(packetReader.limit());
                if (!multistageCoordinator.publish(buffer)) {
                    break;
                }
            }
        } finally {
            packetReader.close();
        }
    }

    private void sendRegisterSlavePacket() throws IOException {
        RegisterSlavePacket registerSlavePacket = new RegisterSlavePacket();
        SocketAddress socketAddress = connector.getSocketChannel().getLocalSocketAddress();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        String host = inetSocketAddress.getHostString();
        int port = inetSocketAddress.getPort();
        registerSlavePacket.host = host;
        registerSlavePacket.port = port;
        registerSlavePacket.username = authenticationInfo.getUsername();
        registerSlavePacket.password = authenticationInfo.getPassword();
        registerSlavePacket.serverId = this.slaveId;
        byte[] packetBodyBytes = registerSlavePacket.encode();

        logger.info("Register slave {}", registerSlavePacket);

        HeaderPacket headerPacket = new HeaderPacket();
        headerPacket.setPacketBodyLength(packetBodyBytes.length);
        headerPacket.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePackets(connector.getSocketChannel(), headerPacket.encode(), packetBodyBytes);

        headerPacket = PacketManager.readHeader(connector.getSocketChannel(), 4);
        packetBodyBytes = PacketManager.readBytes(connector.getSocketChannel(), headerPacket.getPacketBodyLength());
        assert packetBodyBytes != null;
        if (packetBodyBytes[0] < 0) {
            if (packetBodyBytes[0] == -1) {
                ErrorPacket errorPacket = new ErrorPacket();
                errorPacket.decode(packetBodyBytes);
                throw new IOException("Error When doing Register slave:" + errorPacket.toString());
            } else {
                throw new IOException("unpexpected packet with field_count=" + packetBodyBytes[0]);
            }
        }
    }

    private void sendBinlogDumpPacket(String binlogFileName, Long binlogPosition) throws IOException {
        BinlogDumpPacket binlogDumpPacket = new BinlogDumpPacket();
        binlogDumpPacket.binlogFileName = binlogFileName;
        binlogDumpPacket.binlogPosition = binlogPosition;
        binlogDumpPacket.slaveServerId = this.slaveId;
        byte[] packetBodyBytes = binlogDumpPacket.encode();

        logger.info("COM_BINLOG_DUMP with readedIndex:{}", binlogDumpPacket);
        HeaderPacket headerPacket = new HeaderPacket();
        headerPacket.setPacketBodyLength(packetBodyBytes.length);
        headerPacket.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePackets(connector.getSocketChannel(), headerPacket.encode(), packetBodyBytes);
        connector.setDumping(true);
    }

    public void sendSemiAckPacket(String binlogFileName, Long binlogPosition) throws IOException {
        SemiAckPacket semiAckPacket = new SemiAckPacket();
        semiAckPacket.binlogFileName = binlogFileName;
        semiAckPacket.binlogPosition = binlogPosition;

        byte[] packetBodyBytes = semiAckPacket.encode();

        logger.info("SEMI ACK with readedIndex:{}", semiAckPacket);
        HeaderPacket headerPacket = new HeaderPacket();
        headerPacket.setPacketBodyLength(packetBodyBytes.length);
        headerPacket.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePackets(connector.getSocketChannel(), headerPacket.encode(), packetBodyBytes);
    }

    private void sendBinlogDumpGtidPacket(GtidSet gtidSet) throws IOException {
        BinlogDumpGtidPacket binlogDumpGtidPacket = new BinlogDumpGtidPacket();
        binlogDumpGtidPacket.slaveServerId = this.slaveId;
        binlogDumpGtidPacket.gtidSet = gtidSet;
        byte[] packetBodyBytes = binlogDumpGtidPacket.encode();

        logger.info("COM_BINLOG_DUMP_GTID:{}", binlogDumpGtidPacket);
        HeaderPacket headerPacket = new HeaderPacket();
        headerPacket.setPacketBodyLength(packetBodyBytes.length);
        headerPacket.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePackets(connector.getSocketChannel(), headerPacket.encode(), packetBodyBytes);
        connector.setDumping(true);
    }

    public DumperImpl fork() {
        DumperImpl dumper = new DumperImpl();
        dumper.setCharset(getCharset());
        dumper.setSlaveId(getSlaveId());
        dumper.setConnector(connector.fork());
        // set authenticationInfo
        dumper.setAuthenticationInfo(authenticationInfo);
        return dumper;
    }

    @Override
    public long queryServerId() throws IOException {
        ResultSetPacket resultSetPacket = query("show variables like 'server_id'");
        List<String> valueList = resultSetPacket.getValueList();
        if (valueList == null || valueList.size() != 2) {
            return 0;
        }
        return NumberUtils.toLong(valueList.get(1));
    }

    // ====================== help method ====================

    /**
     * the settings that will need decode be checked or set:<br>
     * <ol>
     * <li>wait_timeout</li>
     * <li>net_write_timeout</li>
     * <li>net_read_timeout</li>
     * </ol>
     *
     * @throws IOException
     */
    private void updateSettings() throws IOException {
        try {
            update("set wait_timeout=9999999");
        } catch (Exception e) {
            logger.warn("update wait_timeout failed", e);
        }
        try {
            update("set net_write_timeout=1800");
        } catch (Exception e) {
            logger.warn("update net_write_timeout failed", e);
        }

        try {
            update("set net_read_timeout=1800");
        } catch (Exception e) {
            logger.warn("update net_read_timeout failed", e);
        }

        try {
            // 设置服务端返回结果时不做编码转化，直接按照数据库的二进制编码进行发送，由客户端自己根据需求进行编码转化
            update("set names 'binary'");
        } catch (Exception e) {
            logger.warn("update names failed", e);
        }

        try {
            // mysql5.6针对checksum支持需要设置session变量
            // 如果不设置会出现错误： Slave can not handle replication events with the
            // checksum that master is configured decode log
            // 但也不能乱设置，需要和mysql server的checksum配置一致，不然RotateLogEvent会出现乱码
            // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
            update("set @master_binlog_checksum= @@global.binlog_checksum");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update master_binlog_checksum failed", e);
            }
        }

        try {
            // 参考:https://github.com/alibaba/canal/issues/284
            // mysql5.6需要设置slave_uuid避免被server kill链接
            update("set @slave_uuid=uuid()");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update slave_uuid failed", e);
            }
        }

        try {
            // mariadb针对特殊的类型，需要设置session变量
            update("SET @mariadb_slave_capability='" + BinLogEvent.maria_slave_capability_mine + "'");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update mariadb_slave_capability failed", e);
            }
        }

        /**
         * MASTER_HEARTBEAT_PERIOD sidToUuidSetMap the interval in seconds between replication
         * heartbeats. Whenever the master's binary log is updated with an event, the
         * waiting period for the next heartbeat is reset. interval is a decimal value
         * having the range 0 decode 4294967 seconds and a resolution in milliseconds; the
         * smallest nonzero value is 0.001. Heartbeats are sent by the master only if
         * there are no unsent events in the binary log file for a period longer than
         * interval.
         */
        try {
            long periodNano = TimeUnit.SECONDS.toNanos(PacketReader.master_heartbeat_period_seconds);
            update("SET @master_heartbeat_period=" + periodNano);
        } catch (Exception e) {
            logger.warn("update master_heartbeat_period failed", e);
        }
    }

    /**
     * 获取一下binlog format格式
     */
    private void queryBinlogFormat() {
        ResultSetPacket resultSetPacket = null;
        try {
            resultSetPacket = query("show variables like 'binlog_format'");
        } catch (IOException e) {
            throw new ParseException(e);
        }

        List<String> valueList = resultSetPacket.getValueList();
        if (valueList == null || valueList.size() != 2) {
            logger.warn(
                    "unexpected binlog_event format queryString result, this may cause unexpected result, so throw exception decode request network decode io shutdown.");
            throw new IllegalStateException(
                    "unexpected binlog_event format queryString result:" + resultSetPacket.getValueList());
        }

        binlogFormat = BinlogFormat.valuesOf(valueList.get(1));
        if (binlogFormat == null) {
            throw new IllegalStateException(
                    "unexpected binlog_event format queryString result:" + resultSetPacket.getValueList());
        }
    }

    /**
     * 获取一下binlog image格式
     */
    private void queryBinlogImage() {
        ResultSetPacket resultSetPacket = null;
        try {
            resultSetPacket = query("show variables like 'binlog_row_image'");
        } catch (IOException e) {
            throw new ParseException(e);
        }

        List<String> valueList = resultSetPacket.getValueList();
        if (valueList == null || valueList.size() != 2) {
            // 可能历时版本没有image特性
            binlogImage = BinlogImage.FULL;
        } else {
            binlogImage = BinlogImage.valuesOf(valueList.get(1));
        }

        if (binlogFormat == null) {
            throw new IllegalStateException(
                    "unexpected binlog_event image queryString result:" + resultSetPacket.getValueList());
        }
    }

    /**
     * 获取主库checksum信息
     *
     * <pre>
     * mariadb区别于mysql会在binlog的第一个事件Rotate_Event里也会采用checksum逻辑,而mysql是在第二个binlog事件之后才感知是否需要处理checksum
     * 导致maraidb只要是开启checksum就会出现binlog文件名解析乱码
     * fixed issue : https://github.com/alibaba/canal/issues/1081
     * </pre>
     */
    private void queryBinlogCheckSum() {
        ResultSetPacket resultSetPacket = null;
        try {
            resultSetPacket = query("select @@global.binlog_checksum");
            List<String> valueList = resultSetPacket.getValueList();
            if (valueList != null && valueList.size() >= 1
                    && valueList.get(0).toUpperCase().equals("CRC32")) {
                binlogChecksum = BinLogEvent.binlog_checksum_alg_crc32;
            } else {
                binlogChecksum = BinLogEvent.binlog_checksum_alg_off;
            }
        } catch (Throwable e) {
            logger.error("", e);
            binlogChecksum = BinLogEvent.binlog_checksum_alg_off;
        }
    }


    public BinlogFormat getBinlogFormat() {
        if (binlogFormat == null) {
            synchronized (this) {
                queryBinlogFormat();
            }
        }

        return binlogFormat;
    }

    public BinlogImage getBinlogImage() {
        if (binlogImage == null) {
            synchronized (this) {
                queryBinlogImage();
            }
        }

        return binlogImage;
    }


}
